1   /**
2    * Copyright 2014 Netflix, Inc.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package rx.observables;
17  
18  import java.util.Iterator;
19  import java.util.NoSuchElementException;
20  import java.util.concurrent.CountDownLatch;
21  import java.util.concurrent.Future;
22  import java.util.concurrent.atomic.AtomicReference;
23  
24  import rx.Observable;
25  import rx.Subscriber;
26  import rx.Subscription;
27  import rx.functions.Action1;
28  import rx.functions.Func1;
29  import rx.internal.operators.BlockingOperatorLatest;
30  import rx.internal.operators.BlockingOperatorMostRecent;
31  import rx.internal.operators.BlockingOperatorNext;
32  import rx.internal.operators.BlockingOperatorToFuture;
33  import rx.internal.operators.BlockingOperatorToIterator;
34  import rx.internal.util.UtilityFunctions;
35  
36  /**
37   * {@code BlockingObservable} is a variety of {@link Observable} that provides blocking operators. It can be
38   * useful for testing and demo purposes, but is generally inappropriate for production applications (if you
39   * think you need to use a {@code BlockingObservable} this is usually a sign that you should rethink your
40   * design).
41   * <p>
42   * You construct a {@code BlockingObservable} from an {@code Observable} with {@link #from(Observable)} or
43   * {@link Observable#toBlocking()}.
44   * <p>
45   * The documentation for this interface makes use of a form of marble diagram that has been modified to
46   * illustrate blocking operators. The following legend explains these marble diagrams:
47   * <p>
48   * <img width="640" height="301" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.legend.png" alt="">
49   *
50   * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators">RxJava wiki: Blocking
51   *      Observable Operators</a>
52   * @param <T>
53   *           the type of item emitted by the {@code BlockingObservable}
54   */
55  public final class BlockingObservable<T> {
56  
57      private final Observable<? extends T> o;
58  
59      private BlockingObservable(Observable<? extends T> o) {
60          this.o = o;
61      }
62  
63      /**
64       * Converts an {@link Observable} into a {@code BlockingObservable}.
65       *
66       * @param o
67       *          the {@link Observable} you want to convert
68       * @return a {@code BlockingObservable} version of {@code o}
69       */
70      public static <T> BlockingObservable<T> from(final Observable<? extends T> o) {
71          return new BlockingObservable<T>(o);
72      }
73  
74      /**
75       * Invokes a method on each item emitted by this {@code BlockingObservable} and blocks until the Observable
76       * completes.
77       * <p>
78       * <em>Note:</em> This will block even if the underlying Observable is asynchronous.
79       * <p>
80       * <img width="640" height="330" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.forEach.png" alt="">
81       * <p>
82       * This is similar to {@link Observable#subscribe(Subscriber)}, but it blocks. Because it blocks it does not
83       * need the {@link Subscriber#onCompleted()} or {@link Subscriber#onError(Throwable)} methods. If the
84       * underlying Observable terminates with an error, rather than calling {@code onError}, this method will
85       * throw an exception.
86       *
87       * @param onNext
88       *            the {@link Action1} to invoke for each item emitted by the {@code BlockingObservable}
89       * @throws RuntimeException
90       *             if an error occurs
91       * @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX documentation: Subscribe</a>
92       */
93      public void forEach(final Action1<? super T> onNext) {
94          final CountDownLatch latch = new CountDownLatch(1);
95          final AtomicReference<Throwable> exceptionFromOnError = new AtomicReference<Throwable>();
96  
97          /*
98           * Use 'subscribe' instead of 'unsafeSubscribe' for Rx contract behavior
99           * as this is the final subscribe in the chain.
100          */
101         Subscription subscription = o.subscribe(new Subscriber<T>() {
102             @Override
103             public void onCompleted() {
104                 latch.countDown();
105             }
106 
107             @Override
108             public void onError(Throwable e) {
109                 /*
110                  * If we receive an onError event we set the reference on the
111                  * outer thread so we can git it and throw after the
112                  * latch.await().
113                  * 
114                  * We do this instead of throwing directly since this may be on
115                  * a different thread and the latch is still waiting.
116                  */
117                 exceptionFromOnError.set(e);
118                 latch.countDown();
119             }
120 
121             @Override
122             public void onNext(T args) {
123                 onNext.call(args);
124             }
125         });
126         // block until the subscription completes and then return
127         try {
128             latch.await();
129         } catch (InterruptedException e) {
130             subscription.unsubscribe();
131             // set the interrupted flag again so callers can still get it
132             // for more information see https://github.com/ReactiveX/RxJava/pull/147#issuecomment-13624780
133             Thread.currentThread().interrupt();
134             // using Runtime so it is not checked
135             throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
136         }
137 
138         if (exceptionFromOnError.get() != null) {
139             if (exceptionFromOnError.get() instanceof RuntimeException) {
140                 throw (RuntimeException) exceptionFromOnError.get();
141             } else {
142                 throw new RuntimeException(exceptionFromOnError.get());
143             }
144         }
145     }
146 
147     /**
148      * Returns an {@link Iterator} that iterates over all items emitted by this {@code BlockingObservable}.
149      * <p>
150      * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.getIterator.png" alt="">
151      *
152      * @return an {@link Iterator} that can iterate over the items emitted by this {@code BlockingObservable}
153      * @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a>
154      */
155     public Iterator<T> getIterator() {
156         return BlockingOperatorToIterator.toIterator(o);
157     }
158 
159     /**
160      * Returns the first item emitted by this {@code BlockingObservable}, or throws
161      * {@code NoSuchElementException} if it emits no items.
162      *
163      * @return the first item emitted by this {@code BlockingObservable}
164      * @throws NoSuchElementException
165      *             if this {@code BlockingObservable} emits no items
166      * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
167      */
168     public T first() {
169         return blockForSingle(o.first());
170     }
171 
172     /**
173      * Returns the first item emitted by this {@code BlockingObservable} that matches a predicate, or throws
174      * {@code NoSuchElementException} if it emits no such item.
175      *
176      * @param predicate
177      *            a predicate function to evaluate items emitted by this {@code BlockingObservable}
178      * @return the first item emitted by this {@code BlockingObservable} that matches the predicate
179      * @throws NoSuchElementException
180      *             if this {@code BlockingObservable} emits no such items
181      * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
182      */
183     public T first(Func1<? super T, Boolean> predicate) {
184         return blockForSingle(o.first(predicate));
185     }
186 
187     /**
188      * Returns the first item emitted by this {@code BlockingObservable}, or a default value if it emits no
189      * items.
190      *
191      * @param defaultValue
192      *            a default value to return if this {@code BlockingObservable} emits no items
193      * @return the first item emitted by this {@code BlockingObservable}, or the default value if it emits no
194      *         items
195      * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
196      */
197     public T firstOrDefault(T defaultValue) {
198         return blockForSingle(o.map(UtilityFunctions.<T>identity()).firstOrDefault(defaultValue));
199     }
200 
201     /**
202      * Returns the first item emitted by this {@code BlockingObservable} that matches a predicate, or a default
203      * value if it emits no such items.
204      *
205      * @param defaultValue
206      *            a default value to return if this {@code BlockingObservable} emits no matching items
207      * @param predicate
208      *            a predicate function to evaluate items emitted by this {@code BlockingObservable}
209      * @return the first item emitted by this {@code BlockingObservable} that matches the predicate, or the
210      *         default value if this {@code BlockingObservable} emits no matching items
211      * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
212      */
213     public T firstOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
214         return blockForSingle(o.filter(predicate).map(UtilityFunctions.<T>identity()).firstOrDefault(defaultValue));
215     }
216 
217     /**
218      * Returns the last item emitted by this {@code BlockingObservable}, or throws
219      * {@code NoSuchElementException} if this {@code BlockingObservable} emits no items.
220      * <p>
221      * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.last.png" alt="">
222      *
223      * @return the last item emitted by this {@code BlockingObservable}
224      * @throws NoSuchElementException
225      *             if this {@code BlockingObservable} emits no items
226      * @see <a href="http://reactivex.io/documentation/operators/last.html">ReactiveX documentation: Last</a>
227      */
228     public T last() {
229         return blockForSingle(o.last());
230     }
231 
232     /**
233      * Returns the last item emitted by this {@code BlockingObservable} that matches a predicate, or throws
234      * {@code NoSuchElementException} if it emits no such items.
235      * <p>
236      * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.last.p.png" alt="">
237      *
238      * @param predicate
239      *            a predicate function to evaluate items emitted by the {@code BlockingObservable}
240      * @return the last item emitted by the {@code BlockingObservable} that matches the predicate
241      * @throws NoSuchElementException
242      *             if this {@code BlockingObservable} emits no items
243      * @see <a href="http://reactivex.io/documentation/operators/last.html">ReactiveX documentation: Last</a>
244      */
245     public T last(final Func1<? super T, Boolean> predicate) {
246         return blockForSingle(o.last(predicate));
247     }
248 
249     /**
250      * Returns the last item emitted by this {@code BlockingObservable}, or a default value if it emits no
251      * items.
252      * <p>
253      * <img width="640" height="310" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.lastOrDefault.png" alt="">
254      *
255      * @param defaultValue
256      *            a default value to return if this {@code BlockingObservable} emits no items
257      * @return the last item emitted by the {@code BlockingObservable}, or the default value if it emits no
258      *         items
259      * @see <a href="http://reactivex.io/documentation/operators/last.html">ReactiveX documentation: Last</a>
260      */
261     public T lastOrDefault(T defaultValue) {
262         return blockForSingle(o.map(UtilityFunctions.<T>identity()).lastOrDefault(defaultValue));
263     }
264 
265     /**
266      * Returns the last item emitted by this {@code BlockingObservable} that matches a predicate, or a default
267      * value if it emits no such items.
268      * <p>
269      * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.lastOrDefault.p.png" alt="">
270      *
271      * @param defaultValue
272      *            a default value to return if this {@code BlockingObservable} emits no matching items
273      * @param predicate
274      *            a predicate function to evaluate items emitted by this {@code BlockingObservable}
275      * @return the last item emitted by this {@code BlockingObservable} that matches the predicate, or the
276      *         default value if it emits no matching items
277      * @see <a href="http://reactivex.io/documentation/operators/last.html">ReactiveX documentation: Last</a>
278      */
279     public T lastOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
280         return blockForSingle(o.filter(predicate).map(UtilityFunctions.<T>identity()).lastOrDefault(defaultValue));
281     }
282 
283     /**
284      * Returns an {@link Iterable} that always returns the item most recently emitted by this
285      * {@code BlockingObservable}.
286      * <p>
287      * <img width="640" height="490" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.mostRecent.png" alt="">
288      *
289      * @param initialValue
290      *            the initial value that the {@link Iterable} sequence will yield if this
291      *            {@code BlockingObservable} has not yet emitted an item
292      * @return an {@link Iterable} that on each iteration returns the item that this {@code BlockingObservable}
293      *         has most recently emitted
294      * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
295      */
296     public Iterable<T> mostRecent(T initialValue) {
297         return BlockingOperatorMostRecent.mostRecent(o, initialValue);
298     }
299 
300     /**
301      * Returns an {@link Iterable} that blocks until this {@code BlockingObservable} emits another item, then
302      * returns that item.
303      * <p>
304      * <img width="640" height="490" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.next.png" alt="">
305      *
306      * @return an {@link Iterable} that blocks upon each iteration until this {@code BlockingObservable} emits
307      *         a new item, whereupon the Iterable returns that item
308      * @see <a href="http://reactivex.io/documentation/operators/takelast.html">ReactiveX documentation: TakeLast</a>
309      */
310     public Iterable<T> next() {
311         return BlockingOperatorNext.next(o);
312     }
313 
314     /**
315      * Returns an {@link Iterable} that returns the latest item emitted by this {@code BlockingObservable},
316      * waiting if necessary for one to become available.
317      * <p>
318      * If this {@code BlockingObservable} produces items faster than {@code Iterator.next} takes them,
319      * {@code onNext} events might be skipped, but {@code onError} or {@code onCompleted} events are not.
320      * <p>
321      * Note also that an {@code onNext} directly followed by {@code onCompleted} might hide the {@code onNext}
322      * event.
323      *
324      * @return an Iterable that always returns the latest item emitted by this {@code BlockingObservable}
325      * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
326      */
327     public Iterable<T> latest() {
328         return BlockingOperatorLatest.latest(o);
329     }
330 
331     /**
332      * If this {@code BlockingObservable} completes after emitting a single item, return that item, otherwise
333      * throw a {@code NoSuchElementException}.
334      * <p>
335      * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.single.png" alt="">
336      *
337      * @return the single item emitted by this {@code BlockingObservable}
338      * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
339      */
340     public T single() {
341         return blockForSingle(o.single());
342     }
343 
344     /**
345      * If this {@code BlockingObservable} completes after emitting a single item that matches a given predicate,
346      * return that item, otherwise throw a {@code NoSuchElementException}.
347      * <p>
348      * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.single.p.png" alt="">
349      *
350      * @param predicate
351      *            a predicate function to evaluate items emitted by this {@link BlockingObservable}
352      * @return the single item emitted by this {@code BlockingObservable} that matches the predicate
353      * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
354      */
355     public T single(Func1<? super T, Boolean> predicate) {
356         return blockForSingle(o.single(predicate));
357     }
358 
359     /**
360      * If this {@code BlockingObservable} completes after emitting a single item, return that item; if it emits
361      * more than one item, throw an {@code IllegalArgumentException}; if it emits no items, return a default
362      * value.
363      * <p>
364      * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.singleOrDefault.png" alt="">
365      *
366      * @param defaultValue
367      *            a default value to return if this {@code BlockingObservable} emits no items
368      * @return the single item emitted by this {@code BlockingObservable}, or the default value if it emits no
369      *         items
370      * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
371      */
372     public T singleOrDefault(T defaultValue) {
373         return blockForSingle(o.map(UtilityFunctions.<T>identity()).singleOrDefault(defaultValue));
374     }
375 
376     /**
377      * If this {@code BlockingObservable} completes after emitting a single item that matches a predicate,
378      * return that item; if it emits more than one such item, throw an {@code IllegalArgumentException}; if it
379      * emits no items, return a default value.
380      * <p>
381      * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.singleOrDefault.p.png" alt="">
382      *
383      * @param defaultValue
384      *            a default value to return if this {@code BlockingObservable} emits no matching items
385      * @param predicate
386      *            a predicate function to evaluate items emitted by this {@code BlockingObservable}
387      * @return the single item emitted by the {@code BlockingObservable} that matches the predicate, or the
388      *         default value if no such items are emitted
389      * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
390      */
391     public T singleOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
392         return blockForSingle(o.filter(predicate).map(UtilityFunctions.<T>identity()).singleOrDefault(defaultValue));
393     }
394 
395     /**
396      * Returns a {@link Future} representing the single value emitted by this {@code BlockingObservable}.
397      * <p>
398      * If {@link BlockingObservable} emits more than one item, {@link java.util.concurrent.Future} will receive an
399      * {@link java.lang.IllegalArgumentException}. If {@link BlockingObservable} is empty, {@link java.util.concurrent.Future}
400      * will receive an {@link java.util.NoSuchElementException}.
401      * <p>
402      * If the {@code BlockingObservable} may emit more than one item, use {@code Observable.toList().toBlocking().toFuture()}.
403      * <p>
404      * <img width="640" height="395" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.toFuture.png" alt="">
405      *
406      * @return a {@link Future} that expects a single item to be emitted by this {@code BlockingObservable}
407      * @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a>
408      */
409     public Future<T> toFuture() {
410         return BlockingOperatorToFuture.toFuture(o);
411     }
412 
413     /**
414      * Converts this {@code BlockingObservable} into an {@link Iterable}.
415      * <p>
416      * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.toIterable.png" alt="">
417      *
418      * @return an {@link Iterable} version of this {@code BlockingObservable}
419      * @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a>
420      */
421     public Iterable<T> toIterable() {
422         return new Iterable<T>() {
423             @Override
424             public Iterator<T> iterator() {
425                 return getIterator();
426             }
427         };
428     }
429 
430     /**
431      * Helper method which handles the actual blocking for a single response.
432      * <p>
433      * If the {@link Observable} errors, it will be thrown right away.
434      *
435      * @return the actual item
436      */
437     private T blockForSingle(final Observable<? extends T> observable) {
438         final AtomicReference<T> returnItem = new AtomicReference<T>();
439         final AtomicReference<Throwable> returnException = new AtomicReference<Throwable>();
440         final CountDownLatch latch = new CountDownLatch(1);
441 
442         Subscription subscription = observable.subscribe(new Subscriber<T>() {
443             @Override
444             public void onCompleted() {
445                 latch.countDown();
446             }
447 
448             @Override
449             public void onError(final Throwable e) {
450                 returnException.set(e);
451                 latch.countDown();
452             }
453 
454             @Override
455             public void onNext(final T item) {
456                 returnItem.set(item);
457             }
458         });
459 
460         try {
461             latch.await();
462         } catch (InterruptedException e) {
463             subscription.unsubscribe();
464             Thread.currentThread().interrupt();
465             throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
466         }
467 
468         if (returnException.get() != null) {
469             if (returnException.get() instanceof RuntimeException) {
470                 throw (RuntimeException) returnException.get();
471             } else {
472                 throw new RuntimeException(returnException.get());
473             }
474         }
475 
476         return returnItem.get();
477     }
478 }